-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for optional Kafka properties from external file #8743
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests are missing. Can you please add a case where configuration from resource file is used and we can see the changed behavior from tests?
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/SslKafkaAdminFactory.java
Outdated
Show resolved
Hide resolved
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/PlainTextKafkaProducerFactory.java
Outdated
Show resolved
Hide resolved
100e9a8
to
ee0927e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to come up with a testing, to make sure that it is actually used in all places? Do you have any idea how it can be tested?
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConfig.java
Show resolved
Hide resolved
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/DefaultTextKafkaProducerFactory.java
Outdated
Show resolved
Hide resolved
Hello @kokosing ! Yes, I'm currently in the process of implementing a new product test and let the tests utilize the new config. |
I does have to be a product test. Maybe an unit test? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to have tests that use actual values in the external resource configs so that we can understand how they interact with the different security protocols and trino config options.
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSecurityConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/PlainTextKafkaConsumerFactory.java
Outdated
Show resolved
Hide resolved
ee0927e
to
4ad54c1
Compare
I was thinking of having a product test that runs purely on If we want to go forward with Unit tests, perhaps I can add 2 new tests in
WDYT @kokosing ? |
5c6d725
to
b49cac1
Compare
4c48b79
to
057628b
Compare
13df6f8
to
8911101
Compare
Hi! In our project we use Confluent and want to produce from Trino to its topics. Confluent supports SASL method of authentication, so we are interested in this PR. Are there any blockers for it be released? |
3f26668
to
12793f4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are working for me! Very cool klDen!
@kokosing any chance you can get this approved and push it in?? :) would love to have this in the official builds. Thanks! |
We are waiting for this as well... Any update here team? |
I note there are a huge number of open PRs, how long do reviews and merges usually take? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rebase. It is looks nice. Just few comments.
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/PlainTextKafkaConsumerFactory.java
Outdated
Show resolved
Hide resolved
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/utils/ConfigurationUtils.java
Outdated
Show resolved
Hide resolved
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/utils/ConfigurationUtils.java
Outdated
Show resolved
Hide resolved
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/utils/ConfigurationUtils.java
Outdated
Show resolved
Hide resolved
Hi All! Any updates here?? I'm not exactly sure who we are waiting for at this point. Thanks team! |
Hey. I'll resolve the remaining comments this Sunday. |
@@ -38,19 +40,30 @@ private void installClientModule(SecurityProtocol securityProtocol, Module modul | |||
{ | |||
install(conditionalModule( | |||
KafkaSecurityConfig.class, | |||
config -> config.getSecurityProtocol().equals(securityProtocol), | |||
config -> config.getSecurityProtocol().orElse(null) == securityProtocol, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use Optional#filter(SecurityProtocol::equals).isPresent()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applying this fails TestKafkaPlugin
tests since securityProtocol
is nullable and calling security::equals
throws NullPointerException
.
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java
Outdated
Show resolved
Hide resolved
@@ -27,12 +28,11 @@ | |||
|
|||
public class KafkaSecurityConfig | |||
{ | |||
private SecurityProtocol securityProtocol = PLAINTEXT; | |||
private SecurityProtocol securityProtocol; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have here as Optional<SecurityProtocol>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting it as Optional<SecurityProtocol>
breaks the unit test since it expects an Optional.empty
and actual is null
.
public class TestKafkaSecurityConfig
{
@Test
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(KafkaSecurityConfig.class)
.setSecurityProtocol(null));
The signature public KafkaSecurityConfig setSecurityProtocol(SecurityProtocol securityProtocol)
accepts a plain SecurityProtocol
object and not an Optional<SecurityProtocol>
one.
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/PlainTextKafkaAdminFactory.java
Outdated
Show resolved
Hide resolved
34bb045
to
fde302c
Compare
0eca91c
to
9bb94b3
Compare
9bb94b3
to
0a37bb9
Compare
The failing ci maven-checks is unrelated to my changes: 2022-03-16T06:45:25.760Z ERROR main io.trino.server.Server io.trino.spi.Plugin: Provider io.trino.plugin.deltalake.DeltaLakePlugin not found
java.util.ServiceConfigurationError: io.trino.spi.Plugin: Provider io.trino.plugin.deltalake.DeltaLakePlugin not found
at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:589)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1212)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1221)
at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1265)
at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300)
at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385)
at com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:275)
at com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:239)
at io.trino.server.PluginManager.loadPlugin(PluginManager.java:166)
at io.trino.server.PluginManager.loadPlugin(PluginManager.java:157)
at io.trino.server.ServerPluginsProvider.lambda$loadPlugins$1(ServerPluginsProvider.java:58)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at java.base/java.util.concurrent.ExecutorCompletionService.submit(ExecutorCompletionService.java:184)
at io.trino.util.Executors.executeUntilFailure(Executors.java:41)
at io.trino.server.ServerPluginsProvider.loadPlugins(ServerPluginsProvider.java:53)
at io.trino.server.PluginManager.loadPlugins(PluginManager.java:137)
at io.trino.server.Server.doStart(Server.java:126)
at io.trino.server.Server.lambda$start$0(Server.java:80)
at io.trino.$gen.Trino_c116fb3____20220316_064516_1.run(Unknown Source)
at io.trino.server.Server.start(Server.java:80)
at io.trino.server.TrinoServer.main(TrinoServer.java:38)
🚨 Failed to execute a query after Trino container started
4d094b0ad9bc0319bdf797e2e0c6557378b8abc3d65fc6fee44f031c64fb3c20
Error: Process completed with exit code 1.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rebase and squash commits. There is also last comment from me.
0a37bb9
to
be4b572
Compare
Squashed! |
@Praveen2112 or @lukasz-walkiewicz do you want to take a look? |
Merged, thanks! |
The external file doesn't support referencing secrets since it's not passed through via Bootstrap probably. Not sure if this can be implemented however. |
Solves #6439 and #8197